package main

import (
	"errors"
	"fmt"
	"github.com/gogf/gf/util/gconv"
	"io"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"context"
	"github.com/segmentio/kafka-go"
)

func main() {
	topic := "my-topic123" // TODO 测试用的topic...
	brokers := []string{"localhost:9092"}

	config := kafka.ReaderConfig{
		Brokers:  brokers,
		Topic:    topic,
		GroupID:  "group_id", // TODO 测试...
		MaxWait:  1 * time.Second,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	}

	reader := kafka.NewReader(config)
	defer reader.Close()

	for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Printf("Error reading message: %v\n", err)
			// Notice 判断是网络问题～
			if isTransientNetworkError(err) {
				log.Println("Network error, reconnecting...........")
				time.Sleep(5 * time.Second) // Wait before reconnecting
				continue
			}

			// TODO 其他情况 return，这也意味着不会再消费消息了～～～
			break
		} else {
			fmt.Printf("Received message!!: partition: %v, offset: %v, msg.Value: %v\n", m.Partition, m.Offset, gconv.String(m.Value))
		}

	}

	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	log.Println("Exiting...")
}

// Notice kafka-go源码 私有函数 挪过来
// TODO 实际中需要新增更多重试的情况～～
func isTransientNetworkError(err error) bool {
	return errors.Is(err, io.ErrUnexpectedEOF) ||
		errors.Is(err, syscall.ECONNREFUSED) ||
		errors.Is(err, syscall.ECONNRESET) ||
		errors.Is(err, syscall.EPIPE)
}
